﻿#include <thread>
#include <condition_variable>
#include <list>
#include "threadpool.h"

class FunctionalJob:public ThreadJob
{
public:
	FunctionalJob(std::function<void()> fun, const ThreadJobList& relys):ThreadJob(relys) {
		this->fun = fun;
	}

	void Run() override {
		if (fun) {
			fun();
		}
	}

	std::function<void()> fun;
};


ThreadJobPtr ThreadJob::CreateJob(std::function<void()> fun, const ThreadJobList& relys) {
	return ThreadJobPtr(new FunctionalJob(fun,relys));
}

ThreadJob::ThreadJob(const ThreadJobList & relys)
{
	m_finished = false;
	this->relys = relys;
}

bool ThreadJob::isFinished() const
{
	return m_finished;
}

struct ThreadInfo {
	bool bBussy = false;
	std::shared_ptr<std::thread> m_thread;
};

typedef std::shared_ptr<ThreadInfo> ThreadInfoPtr;

struct ThreadPool::Private
{

	void AllocThread() {
		ThreadInfoPtr info(new ThreadInfo);
		info->m_thread.reset(new std::thread([=]() {
			Loop(info);
			}));
		
		m_threads.push_back(info);
	}

	bool TryAllocThread() {
		if (policy == IncrementPolicy::Fixed) {
			return false;
		}
		else if(policy == IncrementPolicy::IncreseOnNeed){
			if (m_threads.size() == MaxThreadCount) {
				return false;
			}
		}
		AllocThread();
		return true;
	}

	bool AvaliableThreads() {
		for (const auto& i : m_threads) {
			if (!i->bBussy) {
				return true;
			}
		}
		return false;
	}

	//从队列里寻找可以执行的事务
	ThreadJobPtr AcquireJob() {
		if (m_queue.empty()) {
			return NULL;
		}

		for (auto it = m_queue.begin(); it != m_queue.end();it ++) {
			auto i = *it;
			bool avaliable = true;
			for (const auto &j : i->relys) {
				if (!j->isFinished()) {
					avaliable = false;
					break;
				}
			}

			if (avaliable) {
				m_queue.erase(it);
				return i;
			}
		}
		return NULL;
	}

	void Loop(ThreadInfoPtr info) {

		while (true)
		{
			if (isShutted) {
				return;
			}

			ThreadJobPtr job;

			{
				std::unique_lock<std::mutex> lock(m_mutex);

				while (true)
				{
					if (isShutted) {
						return;
					}

					job = AcquireJob();

					if (job == NULL) {
						m_cvar.wait(lock);
						continue;
					}
					else {
						break;
					}
				}
			}

			if (!job->isFinished())
			{
				info->bBussy = true;
				job->Run();
				job->relys.clear();//执行完毕后清空依赖，释放资源
				job->m_finished = true;
				info->bBussy = false;
			}
			m_cvar.notify_all();//当这个任务执行完毕后，可能会同时解锁多个任务去执行。使用notifyall唤醒所有线程一起战斗
		}
	}

	IncrementPolicy policy;
	int MaxThreadCount = 0;
	std::vector<ThreadInfoPtr> m_threads;

	std::list<ThreadJobPtr> m_queue;
	std::mutex m_mutex;
	std::condition_variable m_cvar;

	bool isShutted = false;
	bool isFinished = false;
};

ThreadPool::ThreadPool(int initThreadCount, int MaxThreadCount, IncrementPolicy policy):_P(new Private)
{
	assert(initThreadCount >= 0);
	_P->MaxThreadCount = MaxThreadCount;
	_P->policy = policy;
	for (auto i = 0; i < initThreadCount;i++) {
		_P->AllocThread();
	}

}

ThreadPool::~ThreadPool()
{
	stop();
}

void ThreadPool::PostJob(ThreadJobPtr job)
{
	PostJob(ThreadJobList{job});
}

void ThreadPool::PostJob(const ThreadJobList & jobs)
{
	if (jobs.empty()) {
		return;
	}
	std::unique_lock<std::mutex> lock(_P->m_mutex);

	if (_P->isShutted) {
		return;
	}

	for (const auto &i : jobs) {
		_P->m_queue.push_back(i);
	}

	//当没有可用线程时，尝试申请更多线程
	if (!_P->AvaliableThreads()) {
		_P->TryAllocThread();
	}

	if (jobs.size() >= 1) {
		_P->m_cvar.notify_all();
	}
	else {
		_P->m_cvar.notify_one();
	}
}

void ThreadPool::shut()
{
	std::unique_lock<std::mutex> lock(_P->m_mutex);

	_P->isShutted = true;

	_P->m_queue.clear();
}

void ThreadPool::join()
{
	std::vector<ThreadInfoPtr> threads;
	{
		std::unique_lock<std::mutex> lock(_P->m_mutex);
		_P->m_cvar.notify_all();
		threads = std::move(_P->m_threads);
	}
	for (auto i : threads)
	{
		i->m_thread->join();
	}
}

void ThreadPool::stop()
{
	shut();
	join();
}
